// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl_test

import (
	"fmt"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"github.com/hanchuanchuan/goInception/ast"
	"github.com/hanchuanchuan/goInception/ddl"
	"github.com/hanchuanchuan/goInception/domain"
	"github.com/hanchuanchuan/goInception/executor"
	"github.com/hanchuanchuan/goInception/infoschema"
	"github.com/hanchuanchuan/goInception/kv"
	"github.com/hanchuanchuan/goInception/model"
	"github.com/hanchuanchuan/goInception/parser"
	"github.com/hanchuanchuan/goInception/session"
	"github.com/hanchuanchuan/goInception/sessionctx"
	"github.com/hanchuanchuan/goInception/store/mockstore"
	"github.com/hanchuanchuan/goInception/terror"
	"github.com/hanchuanchuan/goInception/util/admin"
	"github.com/hanchuanchuan/goInception/util/testkit"
	"github.com/hanchuanchuan/goInception/util/testleak"
	. "github.com/pingcap/check"
	"github.com/pingcap/errors"
	"golang.org/x/net/context"
)

var _ = Suite(&testStateChangeSuite{})

type testStateChangeSuite struct {
	lease time.Duration
	store kv.Storage
	dom   *domain.Domain
	se    session.Session
	p     *parser.Parser
}

func (s *testStateChangeSuite) SetUpSuite(c *C) {
	testleak.BeforeTest()
	s.lease = 200 * time.Millisecond
	ddl.WaitTimeWhenErrorOccured = 1 * time.Microsecond
	var err error
	s.store, err = mockstore.NewMockTikvStore()
	c.Assert(err, IsNil)
	session.SetSchemaLease(s.lease)
	s.dom, err = session.BootstrapSession(s.store)
	c.Assert(err, IsNil)
	s.se, err = session.CreateSession4Test(s.store)
	c.Assert(err, IsNil)
	_, err = s.se.Execute(context.Background(), "create database test_db_state")
	c.Assert(err, IsNil)
	_, err = s.se.Execute(context.Background(), "use test_db_state")
	c.Assert(err, IsNil)
	s.p = parser.New()
}

func (s *testStateChangeSuite) TearDownSuite(c *C) {
	s.se.Execute(context.Background(), "drop database if exists test_db_state")
	s.se.Close()
	s.dom.Close()
	s.store.Close()
	testleak.AfterTest(c)()
}

// TestShowCreateTable tests the result of "show create table" when we are running "add index" or "add column".
func (s *testStateChangeSuite) TestShowCreateTable(c *C) {
	tk := testkit.NewTestKit(c, s.store)
	tk.MustExec("use test")
	tk.MustExec("create table t (id int, index idx (id))")

	var checkErr error
	prevState := model.StateNone
	callback := &ddl.TestDDLCallback{}
	callback.OnJobUpdatedExported = func(job *model.Job) {
		if job.SchemaState == prevState || checkErr != nil {
			return
		}
		if job.SchemaState != model.StatePublic {
			result := tk.MustQuery("show create table t")
			got := result.Rows()[0][1]
			var expected string
			if job.Type == model.ActionAddIndex {
				expected = "CREATE TABLE `t` (\n  `id` int(11) DEFAULT NULL,\n  KEY `idx` (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin"
			} else if job.Type == model.ActionAddColumn {
				expected = "CREATE TABLE `t` (\n  `id` int(11) DEFAULT NULL,\n  KEY `idx` (`id`),\n  KEY `idx1` (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin"
			}
			if got != expected {
				checkErr = errors.Errorf("got %s, expected %s", got, expected)
			}
		}
	}
	d := s.dom.DDL()
	d.(ddl.DDLForTest).SetHook(callback)
	tk.MustExec("alter table t add index idx1(id)")
	c.Assert(checkErr, IsNil)
	tk.MustExec("alter table t add column c int")
	c.Assert(checkErr, IsNil)
}

func (s *testStateChangeSuite) TestTwoStates(c *C) {
	cnt := 5
	// New the testExecInfo.
	testInfo := &testExecInfo{
		execCases: cnt,
		sqlInfos:  make([]*sqlInfo, 4),
	}
	for i := 0; i < len(testInfo.sqlInfos); i++ {
		sqlInfo := &sqlInfo{cases: make([]*stateCase, cnt)}
		for j := 0; j < cnt; j++ {
			sqlInfo.cases[j] = new(stateCase)
		}
		testInfo.sqlInfos[i] = sqlInfo
	}
	err := testInfo.createSessions(s.store, "test_db_state")
	c.Assert(err, IsNil)
	// Fill the SQLs and expected error messages.
	testInfo.sqlInfos[0].sql = "insert into t (c1, c2, c3, c4) value(2, 'b', 'N', '2017-07-02')"
	testInfo.sqlInfos[1].sql = "insert into t (c1, c2, c3, d3, c4) value(3, 'b', 'N', 'a', '2017-07-03')"
	unknownColErr := errors.New("unknown column d3")
	testInfo.sqlInfos[1].cases[0].expectedCompileErr = unknownColErr
	testInfo.sqlInfos[1].cases[1].expectedCompileErr = unknownColErr
	testInfo.sqlInfos[1].cases[2].expectedCompileErr = unknownColErr
	testInfo.sqlInfos[1].cases[3].expectedCompileErr = unknownColErr
	testInfo.sqlInfos[2].sql = "update t set c2 = 'c2_update'"
	testInfo.sqlInfos[3].sql = "replace into t values(5, 'e', 'N', '2017-07-05')"
	testInfo.sqlInfos[3].cases[4].expectedCompileErr = errors.New("Column count doesn't match value count at row 1")
	alterTableSQL := "alter table t add column d3 enum('a', 'b') not null default 'a' after c3"
	s.test(c, "", alterTableSQL, testInfo)
	// TODO: Add more DDL statements.
}

func (s *testStateChangeSuite) test(c *C, tableName, alterTableSQL string, testInfo *testExecInfo) {
	_, err := s.se.Execute(context.Background(), `create table t (
		c1 int,
		c2 varchar(64),
		c3 enum('N','Y') not null default 'N',
		c4 timestamp on update current_timestamp,
		key(c1, c2))`)
	c.Assert(err, IsNil)
	defer s.se.Execute(context.Background(), "drop table t")
	_, err = s.se.Execute(context.Background(), "insert into t values(1, 'a', 'N', '2017-07-01')")
	c.Assert(err, IsNil)

	callback := &ddl.TestDDLCallback{}
	prevState := model.StateNone
	var checkErr error
	err = testInfo.parseSQLs(s.p)
	c.Assert(err, IsNil, Commentf("(sql:%v) error stack %v", s.p, errors.ErrorStack(err)))
	times := 0
	callback.OnJobUpdatedExported = func(job *model.Job) {
		if job.SchemaState == prevState || checkErr != nil || times >= 3 {
			return
		}
		times++
		switch job.SchemaState {
		case model.StateDeleteOnly:
			// This state we execute every sqlInfo one time using the first session and other information.
			err = testInfo.compileSQL(0)
			if err != nil {
				checkErr = err
				break
			}
			err = testInfo.execSQL(0)
			if err != nil {
				checkErr = err
			}
		case model.StateWriteOnly:
			// This state we put the schema information to the second case.
			err = testInfo.compileSQL(1)
			if err != nil {
				checkErr = err
			}
		case model.StateWriteReorganization:
			// This state we execute every sqlInfo one time using the third session and other information.
			err = testInfo.compileSQL(2)
			if err != nil {
				checkErr = err
				break
			}
			err = testInfo.execSQL(2)
			if err != nil {
				checkErr = err
				break
			}
			// Mock the server is in `write only` state.
			err = testInfo.execSQL(1)
			if err != nil {
				checkErr = err
				break
			}
			// This state we put the schema information to the fourth case.
			err = testInfo.compileSQL(3)
			if err != nil {
				checkErr = err
			}
		}
	}
	d := s.dom.DDL()
	d.(ddl.DDLForTest).SetHook(callback)
	_, err = s.se.Execute(context.Background(), alterTableSQL)
	c.Assert(err, IsNil)
	err = testInfo.compileSQL(4)
	c.Assert(err, IsNil)
	err = testInfo.execSQL(4)
	c.Assert(err, IsNil)
	// Mock the server is in `write reorg` state.
	err = testInfo.execSQL(3)
	c.Assert(err, IsNil)
	c.Assert(errors.ErrorStack(checkErr), Equals, "")
	callback = &ddl.TestDDLCallback{}
	d.(ddl.DDLForTest).SetHook(callback)
}

type stateCase struct {
	session            session.Session
	rawStmt            ast.StmtNode
	stmt               ast.Statement
	expectedExecErr    error
	expectedCompileErr error
}

type sqlInfo struct {
	sql string
	// cases is multiple stateCases.
	// Every case need to be executed with the different schema state.
	cases []*stateCase
}

// testExecInfo contains some SQL information and the number of times each SQL is executed
// in a DDL statement.
type testExecInfo struct {
	// execCases represents every SQL need to be executed execCases times.
	// And the schema state is different at each execution.
	execCases int
	// sqlInfos represents this test information has multiple SQLs to test.
	sqlInfos []*sqlInfo
}

func (t *testExecInfo) createSessions(store kv.Storage, useDB string) error {
	var err error
	for i, info := range t.sqlInfos {
		for j, c := range info.cases {
			c.session, err = session.CreateSession4Test(store)
			if err != nil {
				return errors.Trace(err)
			}
			_, err = c.session.Execute(context.Background(), "use "+useDB)
			if err != nil {
				return errors.Trace(err)
			}
			// It's used to debug.
			c.session.SetConnectionID(uint64(i*10 + j))
		}
	}
	return nil
}

func (t *testExecInfo) parseSQLs(p *parser.Parser) error {
	if t.execCases <= 0 {
		return nil
	}
	var err error
	for _, sqlInfo := range t.sqlInfos {
		seVars := sqlInfo.cases[0].session.GetSessionVars()
		charset, collation := seVars.GetCharsetInfo()
		for j := 0; j < t.execCases; j++ {
			sqlInfo.cases[j].rawStmt, err = p.ParseOneStmt(sqlInfo.sql, charset, collation)
			if err != nil {
				return errors.Trace(err)
			}
		}
	}
	return nil
}

func (t *testExecInfo) compileSQL(idx int) (err error) {
	for _, info := range t.sqlInfos {
		c := info.cases[idx]
		compiler := executor.Compiler{Ctx: c.session}
		se := c.session
		ctx := context.TODO()
		se.PrepareTxnCtx(ctx)
		sctx := se.(sessionctx.Context)
		if err = executor.ResetContextOfStmt(sctx, c.rawStmt); err != nil {
			return errors.Trace(err)
		}
		c.stmt, err = compiler.Compile(ctx, c.rawStmt)
		if c.expectedCompileErr != nil {
			if err == nil {
				err = errors.Errorf("expected error %s but got nil", c.expectedCompileErr)
			} else if strings.Contains(err.Error(), c.expectedCompileErr.Error()) {
				err = nil
			}
		}
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

func (t *testExecInfo) execSQL(idx int) error {
	for _, sqlInfo := range t.sqlInfos {
		c := sqlInfo.cases[idx]
		if c.expectedCompileErr != nil {
			continue
		}
		_, err := c.stmt.Exec(context.TODO())
		if c.expectedExecErr != nil {
			if err == nil {
				err = errors.Errorf("expected error %s but got nil", c.expectedExecErr)
			} else if strings.Contains(err.Error(), c.expectedExecErr.Error()) {
				err = nil
			}
		}
		if err != nil {
			return errors.Trace(err)
		}
		err = c.session.CommitTxn(context.TODO())
		if err != nil {
			return errors.Trace(err)
		}
	}
	return nil
}

type sqlWithErr struct {
	sql       string
	expectErr error
}

type expectQuery struct {
	sql  string
	rows []string
}

func (s *testStateChangeSuite) TestAppendEnum(c *C) {
	_, err := s.se.Execute(context.Background(), `create table t (
			c1 varchar(64),
			c2 enum('N','Y') not null default 'N',
			c3 timestamp on update current_timestamp,
			c4 int primary key,
			unique key idx2 (c2, c3))`)
	c.Assert(err, IsNil)
	defer s.se.Execute(context.Background(), "drop table t")
	_, err = s.se.Execute(context.Background(), "insert into t values('a', 'N', '2017-07-01', 8)")
	c.Assert(err, IsNil)
	// Make sure these sqls use the the plan of index scan.
	_, err = s.se.Execute(context.Background(), "drop stats t")
	c.Assert(err, IsNil)
	se, err := session.CreateSession(s.store)
	c.Assert(err, IsNil)
	_, err = se.Execute(context.Background(), "use test_db_state")
	c.Assert(err, IsNil)

	_, err = s.se.Execute(context.Background(), "insert into t values('a', 'A', '2018-09-19', 9)")
	c.Assert(err.Error(), Equals, "[table:1366]Incorrect enum value: 'A' for column 'c2' at row 1")
	failAlterTableSQL1 := "alter table t change c2 c2 enum('N') DEFAULT 'N'"
	_, err = s.se.Execute(context.Background(), failAlterTableSQL1)
	c.Assert(err.Error(), Equals, "[ddl:203]unsupported modify column the number of enum column's elements is less than the original: 2")
	failAlterTableSQL2 := "alter table t change c2 c2 int default 0"
	_, err = s.se.Execute(context.Background(), failAlterTableSQL2)
	c.Assert(err.Error(), Equals, "[ddl:203]unsupported modify column charset binary not match origin utf8")
	alterTableSQL := "alter table t change c2 c2 enum('N','Y','A') DEFAULT 'A'"
	_, err = s.se.Execute(context.Background(), alterTableSQL)
	c.Assert(err, IsNil)
	_, err = se.Execute(context.Background(), "insert into t values('a', 'A', '2018-09-20', 10)")
	c.Assert(err, IsNil)
	_, err = se.Execute(context.Background(), "insert into t (c1, c3, c4) values('a', '2018-09-21', 11)")
	c.Assert(err, IsNil)

	tk := testkit.NewTestKit(c, s.store)
	tk.MustExec("use test_db_state")
	result, err := s.execQuery(tk, "select c4, c2 from t order by c4 asc")
	c.Assert(err, IsNil)
	expected := []string{"8 N", "10 A", "11 A"}
	checkResult(result, testkit.Rows(expected...))

	_, err = s.se.Execute(context.Background(), "update t set c2='N' where c4 = 10")
	c.Assert(err, IsNil)
	result, err = s.execQuery(tk, "select c2 from t where c4 = 10")
	c.Assert(err, IsNil)
	expected = []string{"8 N", "10 N", "11 A"}
	checkResult(result, testkit.Rows(expected...))
}

// https://github.com/hanchuanchuan/goInception/pull/6249 fixes the following two test cases.
func (s *testStateChangeSuite) TestWriteOnlyWriteNULL(c *C) {
	sqls := make([]sqlWithErr, 1)
	sqls[0] = sqlWithErr{"insert t set c1 = 'c1_new', c3 = '2019-02-12', c4 = 8 on duplicate key update c1 = values(c1)", nil}
	addColumnSQL := "alter table t add column c5 int not null default 1 after c4"
	expectQuery := &expectQuery{"select c4, c5 from t", []string{"8 1"}}
	// TODO: This case should always fail in write-only state, but it doesn't. We use write-reorganization state here to keep it running stable. It need a double check.
	s.runTestInSchemaState(c, model.StateWriteReorganization, "", addColumnSQL, sqls, expectQuery)
}

func (s *testStateChangeSuite) TestWriteOnlyOnDupUpdate(c *C) {
	sqls := make([]sqlWithErr, 3)
	sqls[0] = sqlWithErr{"delete from t", nil}
	sqls[1] = sqlWithErr{"insert t set c1 = 'c1_dup', c3 = '2018-02-12', c4 = 2 on duplicate key update c1 = values(c1)", nil}
	sqls[2] = sqlWithErr{"insert t set c1 = 'c1_new', c3 = '2019-02-12', c4 = 2 on duplicate key update c1 = values(c1)", nil}
	addColumnSQL := "alter table t add column c5 int not null default 1 after c4"
	expectQuery := &expectQuery{"select c4, c5 from t", []string{"2 1"}}
	// TODO: This case should always fail in write-only state, but it doesn't. We use write-reorganization state here to keep it running stable. It need a double check.
	s.runTestInSchemaState(c, model.StateWriteReorganization, "", addColumnSQL, sqls, expectQuery)
}

// TestWriteOnly tests whether the correct columns is used in PhysicalIndexScan's ToPB function.
func (s *testStateChangeSuite) TestWriteOnly(c *C) {
	sqls := make([]sqlWithErr, 3)
	sqls[0] = sqlWithErr{"delete from t where c1 = 'a'", nil}
	sqls[1] = sqlWithErr{"update t use index(idx2) set c1 = 'c1_update' where c1 = 'a'", nil}
	sqls[2] = sqlWithErr{"insert t set c1 = 'c1_insert', c3 = '2018-02-12', c4 = 1", nil}
	addColumnSQL := "alter table t add column c5 int not null default 1 first"
	s.runTestInSchemaState(c, model.StateWriteOnly, "", addColumnSQL, sqls, nil)
}

// TestDeletaOnly tests whether the correct columns is used in PhysicalIndexScan's ToPB function.
func (s *testStateChangeSuite) TestDeleteOnly(c *C) {
	sqls := make([]sqlWithErr, 1)
	sqls[0] = sqlWithErr{"insert t set c1 = 'c1_insert', c3 = '2018-02-12', c4 = 1",
		errors.Errorf("Can't find column c1")}
	dropColumnSQL := "alter table t drop column c1"
	s.runTestInSchemaState(c, model.StateDeleteOnly, "", dropColumnSQL, sqls, nil)
}

func (s *testStateChangeSuite) runTestInSchemaState(c *C, state model.SchemaState, tableName, alterTableSQL string,
	sqlWithErrs []sqlWithErr, expectQuery *expectQuery) {
	_, err := s.se.Execute(context.Background(), `create table t (
		c1 varchar(64),
		c2 enum('N','Y') not null default 'N',
		c3 timestamp on update current_timestamp,
		c4 int primary key,
		unique key idx2 (c2, c3))`)
	c.Assert(err, IsNil)
	defer s.se.Execute(context.Background(), "drop table t")
	_, err = s.se.Execute(context.Background(), "insert into t values('a', 'N', '2017-07-01', 8)")
	c.Assert(err, IsNil)
	// Make sure these sqls use the the plan of index scan.
	_, err = s.se.Execute(context.Background(), "drop stats t")
	c.Assert(err, IsNil)

	callback := &ddl.TestDDLCallback{}
	prevState := model.StateNone
	var checkErr error
	times := 0
	se, err := session.CreateSession(s.store)
	c.Assert(err, IsNil)
	_, err = se.Execute(context.Background(), "use test_db_state")
	c.Assert(err, IsNil)
	callback.OnJobUpdatedExported = func(job *model.Job) {
		if job.SchemaState == prevState || checkErr != nil || times >= 3 {
			return
		}
		times++
		if job.SchemaState != state {
			return
		}
		for _, sqlWithErr := range sqlWithErrs {
			_, err = se.Execute(context.Background(), sqlWithErr.sql)
			if !terror.ErrorEqual(err, sqlWithErr.expectErr) {
				checkErr = err
				break
			}
		}
	}
	d := s.dom.DDL()
	d.(ddl.DDLForTest).SetHook(callback)
	_, err = s.se.Execute(context.Background(), alterTableSQL)
	c.Assert(err, IsNil)
	c.Assert(errors.ErrorStack(checkErr), Equals, "")
	callback = &ddl.TestDDLCallback{}
	d.(ddl.DDLForTest).SetHook(callback)

	if expectQuery != nil {
		tk := testkit.NewTestKit(c, s.store)
		tk.MustExec("use test_db_state")
		result, err := s.execQuery(tk, expectQuery.sql)
		c.Assert(err, IsNil)
		err = checkResult(result, testkit.Rows(expectQuery.rows...))
		c.Assert(err, IsNil)
	}
}

func (s *testStateChangeSuite) execQuery(tk *testkit.TestKit, sql string, args ...interface{}) (*testkit.Result, error) {
	comment := Commentf("sql:%s, args:%v", sql, args)
	rs, err := tk.Exec(sql, args...)
	if err != nil {
		return nil, err
	}
	result := tk.ResultSetToResult(rs, comment)
	return result, nil
}

func checkResult(result *testkit.Result, expected [][]interface{}) error {
	got := fmt.Sprintf("%s", result.Rows())
	need := fmt.Sprintf("%s", expected)
	if got != need {
		return fmt.Errorf("need %v, but got %v", need, got)
	}
	return nil
}

func (s *testStateChangeSuite) CheckResult(tk *testkit.TestKit, sql string, args ...interface{}) (*testkit.Result, error) {
	comment := Commentf("sql:%s, args:%v", sql, args)
	rs, err := tk.Exec(sql, args...)
	if err != nil {
		return nil, err
	}
	result := tk.ResultSetToResult(rs, comment)
	return result, nil
}

func (s *testStateChangeSuite) TestShowIndex(c *C) {
	_, err := s.se.Execute(context.Background(), `create table t(c1 int primary key, c2 int)`)
	c.Assert(err, IsNil)
	defer s.se.Execute(context.Background(), "drop table t")

	callback := &ddl.TestDDLCallback{}
	prevState := model.StateNone
	tk := testkit.NewTestKit(c, s.store)
	tk.MustExec("use test_db_state")
	showIndexSQL := `show index from t`
	var checkErr error
	callback.OnJobUpdatedExported = func(job *model.Job) {
		if job.SchemaState == prevState || checkErr != nil {
			return
		}
		switch job.SchemaState {
		case model.StateDeleteOnly, model.StateWriteOnly, model.StateWriteReorganization:
			result, err1 := s.execQuery(tk, showIndexSQL)
			if err1 != nil {
				checkErr = err1
				break
			}
			checkErr = checkResult(result, testkit.Rows("t 0 PRIMARY 1 c1 A 0 <nil> <nil>  BTREE  "))
		}
	}

	d := s.dom.DDL()
	d.(ddl.DDLForTest).SetHook(callback)
	alterTableSQL := `alter table t add index c2(c2)`
	_, err = s.se.Execute(context.Background(), alterTableSQL)
	c.Assert(err, IsNil)
	c.Assert(errors.ErrorStack(checkErr), Equals, "")

	result, err := s.execQuery(tk, showIndexSQL)
	c.Assert(err, IsNil)
	err = checkResult(result, testkit.Rows("t 0 PRIMARY 1 c1 A 0 <nil> <nil>  BTREE  ", "t 1 c2 1 c2 A 0 <nil> <nil> YES BTREE  "))
	c.Assert(err, IsNil)
	callback = &ddl.TestDDLCallback{}
	d.(ddl.DDLForTest).SetHook(callback)

	_, err = s.se.Execute(context.Background(), "set @@tidb_enable_table_partition = 1")
	c.Assert(err, IsNil)

	_, err = s.se.Execute(context.Background(), `create table tr(
		id int, name varchar(50),
		purchased date
	)
	partition by range( year(purchased) ) (
    	partition p0 values less than (1990),
    	partition p1 values less than (1995),
    	partition p2 values less than (2000),
    	partition p3 values less than (2005),
    	partition p4 values less than (2010),
    	partition p5 values less than (2015)
   	);`)
	c.Assert(err, IsNil)
	defer s.se.Execute(context.Background(), "drop table tr")
	_, err = s.se.Execute(context.Background(), "create index idx1 on tr (purchased);")
	c.Assert(err, IsNil)
	result, err = s.execQuery(tk, "show index from tr;")
	c.Assert(err, IsNil)
	err = checkResult(result, testkit.Rows("tr 1 idx1 1 purchased A 0 <nil> <nil>  BTREE  ", "t 1 c2 1 c2 A 0 <nil> <nil> YES BTREE  "))
}

func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
	sql := "ALTER TABLE t MODIFY COLUMN b int FIRST;"
	f := func(c *C, err1, err2 error) {
		c.Assert(err1, IsNil)
		c.Assert(err2, IsNil)
		_, err := s.se.Execute(context.Background(), "select * from t")
		c.Assert(err, IsNil)
	}
	s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelChangeColumnName(c *C) {
	sql1 := "ALTER TABLE t CHANGE a aa int;"
	sql2 := "ALTER TABLE t CHANGE b aa int;"
	f := func(c *C, err1, err2 error) {
		// Make sure only a DDL encounters the error of 'duplicate column name'.
		var oneErr error
		if (err1 != nil && err2 == nil) || (err1 == nil && err2 != nil) {
			if err1 != nil {
				oneErr = err1
			} else {
				oneErr = err2
			}
		}
		c.Assert(oneErr.Error(), Equals, "[schema:1060]Duplicate column name 'aa'")
	}
	s.testControlParallelExecSQL(c, sql1, sql2, f)
}

func (s *testStateChangeSuite) TestParallelAlterAddIndex(c *C) {
	sql1 := "ALTER TABLE t add index index_b(b);"
	sql2 := "CREATE INDEX index_b ON t (c);"
	f := func(c *C, err1, err2 error) {
		c.Assert(err1, IsNil)
		c.Assert(err2.Error(), Equals, "[ddl:1061]index already exist index_b")
	}
	s.testControlParallelExecSQL(c, sql1, sql2, f)
}

func (s *testStateChangeSuite) TestParallelDropColumn(c *C) {
	sql := "ALTER TABLE t drop COLUMN c ;"
	f := func(c *C, err1, err2 error) {
		c.Assert(err1, IsNil)
		c.Assert(err2.Error(), Equals, "[ddl:1091]column c doesn't exist")
	}
	s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelCreateAndRename(c *C) {
	sql1 := "create table t_exists(c int);"
	sql2 := "alter table t rename to t_exists;"
	defer s.se.Execute(context.Background(), "drop table t_exists")
	f := func(c *C, err1, err2 error) {
		c.Assert(err1, IsNil)
		c.Assert(err2.Error(), Equals, "[schema:1050]Table 't_exists' already exists")
	}
	s.testControlParallelExecSQL(c, sql1, sql2, f)
}

type checkRet func(c *C, err1, err2 error)

func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
	_, err := s.se.Execute(context.Background(), "use test_db_state")
	c.Assert(err, IsNil)
	_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int)")
	c.Assert(err, IsNil)
	defer s.se.Execute(context.Background(), "drop table t")

	callback := &ddl.TestDDLCallback{}
	times := 0
	callback.OnJobUpdatedExported = func(job *model.Job) {
		if times != 0 {
			return
		}
		var qLen int
		for {
			kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
				jobs, err1 := admin.GetDDLJobs(txn)
				if err1 != nil {
					return err1
				}
				qLen = len(jobs)
				return nil
			})
			if qLen == 2 {
				break
			}
			time.Sleep(5 * time.Millisecond)
		}
		times++
	}
	d := s.dom.DDL()
	d.(ddl.DDLForTest).SetHook(callback)

	wg := sync.WaitGroup{}
	var err1 error
	var err2 error
	se, err := session.CreateSession(s.store)
	c.Assert(err, IsNil)
	_, err = se.Execute(context.Background(), "use test_db_state")
	c.Assert(err, IsNil)
	se1, err := session.CreateSession(s.store)
	c.Assert(err, IsNil)
	_, err = se1.Execute(context.Background(), "use test_db_state")
	c.Assert(err, IsNil)
	wg.Add(2)
	ch := make(chan struct{})
	go func() {
		defer wg.Done()
		close(ch)
		_, err1 = se.Execute(context.Background(), sql1)
	}()
	go func() {
		defer wg.Done()
		<-ch
		// Make sure sql2 is executed after the sql1.
		time.Sleep(time.Millisecond * 10)
		_, err2 = se1.Execute(context.Background(), sql2)
	}()

	wg.Wait()
	f(c, err1, err2)

	callback = &ddl.TestDDLCallback{}
	d.(ddl.DDLForTest).SetHook(callback)
}

func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) {
	se, err := session.CreateSession(s.store)
	_, err = se.Execute(context.Background(), "use test_db_state")
	c.Assert(err, IsNil)

	se1, err1 := session.CreateSession(s.store)
	_, err = se1.Execute(context.Background(), "use test_db_state")
	c.Assert(err1, IsNil)

	var err2, err3 error
	wg := sync.WaitGroup{}

	callback := &ddl.TestDDLCallback{}
	once := sync.Once{}
	callback.OnJobUpdatedExported = func(job *model.Job) {
		// sleep a while, let other job enqueue.
		once.Do(func() {
			time.Sleep(time.Millisecond * 10)
		})
	}

	d := s.dom.DDL()
	d.(ddl.DDLForTest).SetHook(callback)

	wg.Add(2)
	go func() {
		defer wg.Done()
		_, err2 = se.Execute(context.Background(), sql)
	}()

	go func() {
		defer wg.Done()
		_, err3 = se1.Execute(context.Background(), sql)
	}()
	wg.Wait()
	c.Assert(err2, IsNil)
	c.Assert(err3, IsNil)
	callback = &ddl.TestDDLCallback{}
	d.(ddl.DDLForTest).SetHook(callback)
}

// TestCreateTableIfNotExists parallel exec create table if not exists xxx. No error returns is expected.
func (s *testStateChangeSuite) TestCreateTableIfNotExists(c *C) {
	defer s.se.Execute(context.Background(), "drop table test_not_exists")
	s.testParallelExecSQL(c, "create table if not exists test_not_exists(a int);")
}

// TestCreateDBIfNotExists parallel exec create database if not exists xxx. No error returns is expected.
func (s *testStateChangeSuite) TestCreateDBIfNotExists(c *C) {
	defer s.se.Execute(context.Background(), "drop database test_not_exists")
	s.testParallelExecSQL(c, "create database if not exists test_not_exists;")
}

// TestParallelDDLBeforeRunDDLJob tests a session to execute DDL with an outdated information schema.
// This test is used to simulate the following conditions:
// In a cluster, TiDB "a" executes the DDL.
// TiDB "b" fails to load schema, then TiDB "b" executes the DDL statement associated with the DDL statement executed by "a".
func (s *testStateChangeSuite) TestParallelDDLBeforeRunDDLJob(c *C) {
	defer s.se.Execute(context.Background(), "drop table test_table")
	_, err := s.se.Execute(context.Background(), "use test_db_state")
	c.Assert(err, IsNil)
	_, err = s.se.Execute(context.Background(), "create table test_table (c1 int, c2 int default 1, index (c1))")
	c.Assert(err, IsNil)

	// Create two sessions.
	se, err := session.CreateSession(s.store)
	c.Assert(err, IsNil)
	_, err = se.Execute(context.Background(), "use test_db_state")
	c.Assert(err, IsNil)
	se1, err := session.CreateSession(s.store)
	c.Assert(err, IsNil)
	_, err = se1.Execute(context.Background(), "use test_db_state")
	c.Assert(err, IsNil)

	intercept := &ddl.TestInterceptor{}
	firstConnID := uint64(1)
	finishedCnt := int32(0)
	interval := 5 * time.Millisecond
	var sessionCnt int32 // sessionCnt is the number of sessions that goes into the function of OnGetInfoSchema.
	intercept.OnGetInfoSchemaExported = func(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema {
		// The following code is for testing.
		// Make sure the two sessions get the same information schema before executing DDL.
		// After the first session executes its DDL, then the second session executes its DDL.
		var info infoschema.InfoSchema
		atomic.AddInt32(&sessionCnt, 1)
		for {
			// Make sure there are two sessions running here.
			if atomic.LoadInt32(&sessionCnt) == 2 {
				info = is
				break
			}
			time.Sleep(interval)
		}

		currID := ctx.GetSessionVars().ConnectionID
		for {
			seCnt := atomic.LoadInt32(&sessionCnt)
			// Make sure the two session have got the same information schema. And the first session can continue to go on,
			// or the frist session finished this SQL(seCnt = finishedCnt), then other sessions can continue to go on.
			if currID == firstConnID || seCnt == finishedCnt {
				break
			}
			time.Sleep(interval)
		}

		return info
	}
	d := s.dom.DDL()
	d.(ddl.DDLForTest).SetInterceptoror(intercept)

	// Make sure the connection 1 executes a SQL before the connection 2.
	// And the connection 2 executes a SQL with an outdated information schema.
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		defer wg.Done()

		se.SetConnectionID(firstConnID)
		_, err1 := se.Execute(context.Background(), "alter table test_table drop column c2")
		c.Assert(err1, IsNil)
		atomic.StoreInt32(&sessionCnt, finishedCnt)
	}()
	go func() {
		defer wg.Done()

		se1.SetConnectionID(2)
		_, err2 := se1.Execute(context.Background(), "alter table test_table add column c2 int")
		c.Assert(err2, NotNil)
		c.Assert(strings.Contains(err2.Error(), "Information schema is changed"), IsTrue)
	}()

	wg.Wait()

	intercept = &ddl.TestInterceptor{}
	d.(ddl.DDLForTest).SetInterceptoror(intercept)
}
